package com.atguigu.flink.datastreamapi.source;

import com.atguigu.flink.function.MyWordCountFlatmapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/7
 *

        ①引入file-connector
        ②读本地文件系统，也可以读hdfs
            在代码中引入hadoop-client

 */
public class Demo3_SourceFromFile
{
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                 new TextLineInputFormat(),
                                                  new Path("hdfs://hadoop102:8020/data/ws.json"))
                                             .build();
        DataStreamSource<String> dataStreamSource = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "wc");

        dataStreamSource.
            flatMap(new MyWordCountFlatmapFunction())
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>()
            {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            })
            .sum(1)
            .print();

        environment.execute();



    }
}
